package com.atguigu.app;

import com.atguigu.bean.UDTFBean;
import com.atguigu.func.TestTableFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkTestUDTF {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1001,hello_atguigu:hello_flink
        SingleOutputStreamOperator<UDTFBean> udtfBeanDS = env.socketTextStream("hadoop102", 9999)
                .map(line -> {
                    String[] split = line.split(",");
                    return new UDTFBean(split[0], split[1]);
                });

        tableEnv.createTemporaryView("t1", udtfBeanDS);

        //注册UDTF
        tableEnv.createTemporarySystemFunction("my_udtf", TestTableFunction.class);

        tableEnv.sqlQuery("" +
                        "SELECT \n" +
                        "    id, \n" +
                        "    a, \n" +
                        "    b \n" +
                        "FROM t1, LATERAL TABLE(my_udtf(msg))")
                .execute()
                .print();

    }

}
